// Copyright Epic Games, Inc. All Rights Reserved.

#include "compactcas.h"

#include "cas.h"

#include <zencore/compress.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenstore/scrubcontext.h>

#include <gsl/gsl-lite.hpp>

#include <xxhash.h>

#if ZEN_WITH_TESTS
#	include <zencore/compactbinarybuilder.h>
#	include <zencore/testing.h>
#	include <zencore/testutils.h>
#	include <zencore/workthreadpool.h>
#	include <zenstore/cidstore.h>
#	include <algorithm>
#	include <random>
ZEN_THIRD_PARTY_INCLUDES_START
#	include <tsl/robin_map.h>
ZEN_THIRD_PARTY_INCLUDES_END
#endif

//////////////////////////////////////////////////////////////////////////

namespace zen {

struct CasDiskIndexHeader
{
	static constexpr uint32_t ExpectedMagic	 = 0x75696478;	// 'uidx';
	static constexpr uint32_t CurrentVersion = 1;

	uint32_t Magic			  = ExpectedMagic;
	uint32_t Version		  = CurrentVersion;
	uint64_t EntryCount		  = 0;
	uint64_t LogPosition	  = 0;
	uint32_t PayloadAlignment = 0;
	uint32_t Checksum		  = 0;

	static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header)
	{
		return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
	}
};

static_assert(sizeof(CasDiskIndexHeader) == 32);

namespace {
	const char* IndexExtension = ".uidx";
	const char* LogExtension   = ".ulog";

	std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
	{
		return RootPath / ContainerBaseName;
	}

	std::filesystem::path GetIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
	{
		return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + IndexExtension);
	}

	std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
	{
		return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension);
	}

	std::filesystem::path GetLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
	{
		return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + LogExtension);
	}

	std::filesystem::path GetBlocksBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
	{
		return GetBasePath(RootPath, ContainerBaseName) / "blocks";
	}

	bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason)
	{
		if (Entry.Key == IoHash::Zero)
		{
			OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
			return false;
		}
		if ((Entry.Flags & ~CasDiskIndexEntry::kTombstone) != 0)
		{
			OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString());
			return false;
		}
		if (Entry.Flags & CasDiskIndexEntry::kTombstone)
		{
			return true;
		}
		if (Entry.ContentType != ZenContentType::kUnknownContentType)
		{
			OutReason =
				fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString());
			return false;
		}
		if (const uint64_t Size = Entry.Location.GetSize(); Size == 0)
		{
			OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
			return false;
		}
		return true;
	}

}  // namespace

//////////////////////////////////////////////////////////////////////////

static const float IndexMinLoadFactor = 0.2f;
static const float IndexMaxLoadFactor = 0.7f;

CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get("containercas")), m_Gc(Gc)
{
	m_LocationMap.min_load_factor(IndexMinLoadFactor);
	m_LocationMap.max_load_factor(IndexMaxLoadFactor);

	m_Gc.AddGcStorage(this);
	m_Gc.AddGcReferenceStore(*this);
}

CasContainerStrategy::~CasContainerStrategy()
{
	m_Gc.RemoveGcReferenceStore(*this);
	m_Gc.RemoveGcStorage(this);
}

void
CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory,
								 const std::string_view		  ContainerBaseName,
								 uint32_t					  MaxBlockSize,
								 uint32_t					  Alignment,
								 bool						  IsNewStore)
{
	ZEN_ASSERT(IsPow2(Alignment));
	ZEN_ASSERT(!m_IsInitialized);
	ZEN_ASSERT(MaxBlockSize > 0);

	m_RootDirectory		= RootDirectory;
	m_ContainerBaseName = ContainerBaseName;
	m_PayloadAlignment	= Alignment;
	m_MaxBlockSize		= MaxBlockSize;
	m_BlocksBasePath	= GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName);

	OpenContainer(IsNewStore);

	m_IsInitialized = true;
}

CasStore::InsertResult
CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash)
{
	ZEN_TRACE_CPU("CasContainer::InsertChunk");
	{
		RwLock::SharedLockScope _(m_LocationMapLock);
		if (m_LocationMap.contains(ChunkHash))
		{
			return CasStore::InsertResult{.New = false};
		}
	}

	// We can end up in a situation that InsertChunk writes the same chunk data in
	// different locations.
	// We release the insert lock once we have the correct WriteBlock ready and we know
	// where to write the data. If a new InsertChunk request for the same chunk hash/data
	// comes in before we update m_LocationMap below we will have a race.
	// The outcome of that is that we will write the chunk data in more than one location
	// but the chunk hash will only point to one of the chunks.
	// We will in that case waste space until the next GC operation.
	//
	// This should be a rare occasion and the current flow reduces the time we block for
	// reads, insert and GC.

	m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [&](const BlockStoreLocation& Location) {
		BlockStoreDiskLocation	DiskLocation(Location, m_PayloadAlignment);
		const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
		m_CasLog.Append(IndexEntry);
		{
			RwLock::ExclusiveLockScope _(m_LocationMapLock);
			m_LocationMap.emplace(ChunkHash, m_Locations.size());
			m_Locations.push_back(DiskLocation);
		}
	});

	return CasStore::InsertResult{.New = true};
}

CasStore::InsertResult
CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
{
#if !ZEN_WITH_TESTS
	ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
#endif
	return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
}

std::vector<CasStore::InsertResult>
CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes)
{
	ZEN_ASSERT(Chunks.size() == ChunkHashes.size());
	std::vector<CasStore::InsertResult> Result(Chunks.size());
	std::vector<size_t>					NewChunkIndexes;
	Result.reserve(Chunks.size());
	{
		RwLock::SharedLockScope _(m_LocationMapLock);
		for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++)
		{
			const IoHash& ChunkHash = ChunkHashes[ChunkIndex];
			bool		  IsNew		= !m_LocationMap.contains(ChunkHash);
			Result[ChunkIndex]		= CasStore::InsertResult{.New = IsNew};
			if (IsNew)
			{
				NewChunkIndexes.push_back(ChunkIndex);
			}
		}
	}

	if (NewChunkIndexes.empty())
	{
		return Result;
	}

	std::vector<IoBuffer> Datas;
	for (size_t ChunkIndex : NewChunkIndexes)
	{
		const IoBuffer& Chunk = Chunks[ChunkIndex];
#if !ZEN_WITH_TESTS
		ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
#endif
		Datas.emplace_back(Chunk);
	}

	size_t ChunkOffset = 0;
	m_BlockStore.WriteChunks(Datas, m_PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
		std::vector<CasDiskIndexEntry> IndexEntries;
		for (const BlockStoreLocation& Location : Locations)
		{
			size_t ChunkIndex = NewChunkIndexes[ChunkOffset++];
			IndexEntries.emplace_back(
				CasDiskIndexEntry{.Key = ChunkHashes[ChunkIndex], .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)});
		}
		m_CasLog.Append(IndexEntries);
		{
			RwLock::ExclusiveLockScope _(m_LocationMapLock);
			for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries)
			{
				m_LocationMap.emplace(DiskIndexEntry.Key, m_Locations.size());
				m_Locations.push_back(DiskIndexEntry.Location);
			}
		}
	});
	return Result;
}

IoBuffer
CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
	ZEN_TRACE_CPU("CasContainer::FindChunk");

	RwLock::SharedLockScope _(m_LocationMapLock);
	auto					KeyIt = m_LocationMap.find(ChunkHash);
	if (KeyIt == m_LocationMap.end())
	{
		return IoBuffer();
	}
	const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment);

	IoBuffer Chunk = m_BlockStore.TryGetChunk(Location);
	return Chunk;
}

bool
CasContainerStrategy::HaveChunk(const IoHash& ChunkHash)
{
	RwLock::SharedLockScope _(m_LocationMapLock);
	return m_LocationMap.contains(ChunkHash);
}

void
CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks)
{
	// This implementation is good enough for relatively small
	// chunk sets (in terms of chunk identifiers), but would
	// benefit from a better implementation which removes
	// items incrementally for large sets, especially when
	// we're likely to already have a large proportion of the
	// chunks in the set

	InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
}

bool
CasContainerStrategy::IterateChunks(std::span<IoHash>												  ChunkHashes,
									const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
									WorkerThreadPool*												  OptionalWorkerPool)
{
	std::vector<size_t>				FoundChunkIndexes;
	std::vector<BlockStoreLocation> FoundChunkLocations;
	RwLock::SharedLockScope			_(m_LocationMapLock);
	for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++)
	{
		if (auto KeyIt = m_LocationMap.find(ChunkHashes[ChunkIndex]); KeyIt != m_LocationMap.end())
		{
			FoundChunkIndexes.push_back(ChunkIndex);
			FoundChunkLocations.push_back(m_Locations[KeyIt->second].Get(m_PayloadAlignment));
		}
	}

	auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) {
		return m_BlockStore.IterateBlock(
			FoundChunkLocations,
			ChunkIndexes,
			[&](size_t ChunkIndex, const void* Data, uint64_t Size) {
				if (Data == nullptr)
				{
					return true;
				}
				return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size));
			},
			[&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
				return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size));
			});
	};

	Latch			 WorkLatch(1);
	std::atomic_bool AsyncContinue = true;
	bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
		if (OptionalWorkerPool)
		{
			WorkLatch.AddCount(1);
			OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
				auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
				if (!AsyncContinue)
				{
					return;
				}
				try
				{
					bool Continue = DoOneBlock(ChunkIndexes);
					if (!Continue)
					{
						AsyncContinue.store(false);
					}
				}
				catch (const std::exception& Ex)
				{
					ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
							 m_RootDirectory,
							 BlockIndex,
							 Ex.what());
				}
			});
			return AsyncContinue.load();
		}
		else
		{
			return DoOneBlock(ChunkIndexes);
		}
	});
	WorkLatch.CountDown();
	WorkLatch.Wait();
	return AsyncContinue.load() && Continue;
}

void
CasContainerStrategy::Flush()
{
	ZEN_TRACE_CPU("CasContainer::Flush");
	m_BlockStore.Flush(/*ForceNewBlock*/ false);
	m_CasLog.Flush();
	MakeIndexSnapshot();
}

void
CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
{
	ZEN_TRACE_CPU("CasContainer::ScrubStorage");

	if (Ctx.IsSkipCas())
	{
		ZEN_INFO("SKIPPED scrubbing: '{}'", m_BlocksBasePath);
		return;
	}

	ZEN_INFO("scrubbing '{}'", m_BlocksBasePath);

	RwLock							BadKeysLock;
	std::vector<IoHash>				BadKeys;
	std::atomic_uint64_t			ChunkCount{0}, ChunkBytes{0};
	std::vector<BlockStoreLocation> ChunkLocations;
	std::vector<IoHash>				ChunkIndexToChunkHash;

	try
	{
		RwLock::SharedLockScope _(m_LocationMapLock);

		uint64_t TotalChunkCount = m_LocationMap.size();
		ChunkLocations.reserve(TotalChunkCount);
		ChunkIndexToChunkHash.reserve(TotalChunkCount);
		{
			for (const auto& Entry : m_LocationMap)
			{
				const IoHash&				  ChunkHash	   = Entry.first;
				const BlockStoreDiskLocation& DiskLocation = m_Locations[Entry.second];
				BlockStoreLocation			  Location	   = DiskLocation.Get(m_PayloadAlignment);

				ChunkLocations.push_back(Location);
				ChunkIndexToChunkHash.push_back(ChunkHash);
			}
		}

		const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
			ChunkCount.fetch_add(1);
			ChunkBytes.fetch_add(Size);

			const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
			if (!Data)
			{
				// ChunkLocation out of range of stored blocks
				BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
				return true;
			}

			IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
			IoHash	 RawHash;
			uint64_t RawSize;
			if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
			{
				if (RawHash == Hash)
				{
					// TODO: this should also hash the (decompressed) contents
					return true;
				}
			}
			BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
			return true;
		};

		const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
			Ctx.ThrowIfDeadlineExpired();

			ChunkCount.fetch_add(1);
			ChunkBytes.fetch_add(Size);

			const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
			IoBuffer	  Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);

			IoHash	 RawHash;
			uint64_t RawSize;
			// TODO: Add API to verify compressed buffer without having to memory-map the whole file
			if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
			{
				if (RawHash == Hash)
				{
					// TODO: this should also hash the (decompressed) contents
					return true;
				}
			}
			BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
			return true;
		};

		m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) {
			return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk);
		});
	}
	catch (const ScrubDeadlineExpiredException&)
	{
		ZEN_INFO("Scrubbing deadline expired, operation incomplete");
	}

	Ctx.ReportScrubbed(ChunkCount, ChunkBytes);

	if (!BadKeys.empty())
	{
		ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName);

		if (Ctx.RunRecovery())
		{
			// Deal with bad chunks by removing them from our lookup map

			std::vector<CasDiskIndexEntry> LogEntries;
			LogEntries.reserve(BadKeys.size());
			{
				RwLock::ExclusiveLockScope IndexLock(m_LocationMapLock);
				for (const IoHash& ChunkHash : BadKeys)
				{
					const auto KeyIt = m_LocationMap.find(ChunkHash);
					if (KeyIt == m_LocationMap.end())
					{
						// Might have been GC'd
						continue;
					}
					LogEntries.push_back(
						{.Key = KeyIt->first, .Location = m_Locations[KeyIt->second], .Flags = CasDiskIndexEntry::kTombstone});
					m_LocationMap.erase(KeyIt);
				}

				// Clean up m_Locations vectors
				CompactIndex(IndexLock);
			}
			m_CasLog.Append(LogEntries);
		}
	}

	// Let whomever it concerns know about the bad chunks. This could
	// be used to invalidate higher level data structures more efficiently
	// than a full validation pass might be able to do
	if (!BadKeys.empty())
	{
		Ctx.ReportBadCidChunks(BadKeys);
	}

	ZEN_INFO("scrubbed {} chunks ({}) in '{}'", ChunkCount.load(), NiceBytes(ChunkBytes.load()), m_RootDirectory / m_ContainerBaseName);
}

void
CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
{
	ZEN_TRACE_CPU("CasContainer::CollectGarbage");

	if (GcCtx.SkipCid())
	{
		return;
	}

	// It collects all the blocks that we want to delete chunks from. For each such
	// block we keep a list of chunks to retain and a list of chunks to delete.
	//
	// If there is a block that we are currently writing to, that block is omitted
	// from the garbage collection.
	//
	// Next it will iterate over all blocks that we want to remove chunks from.
	// If the block is empty after removal of chunks we mark the block as pending
	// delete - we want to delete it as soon as there are no IoBuffers using the
	// block file.
	// Once complete we update the m_LocationMap by removing the chunks.
	//
	// If the block is non-empty we write out the chunks we want to keep to a new
	// block file (creating new block files as needed).
	//
	// We update the index as we complete each new block file. This makes it possible
	// to break the GC if we want to limit time for execution.
	//
	// GC can very parallell to regular operation - it will block while taking
	// a snapshot of the current m_LocationMap state and while moving blocks it will
	// do a blocking operation and update the m_LocationMap after each new block is
	// written and figuring out the path to the next new block.

	ZEN_DEBUG("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName);

	uint64_t WriteBlockTimeUs		 = 0;
	uint64_t WriteBlockLongestTimeUs = 0;
	uint64_t ReadBlockTimeUs		 = 0;
	uint64_t ReadBlockLongestTimeUs	 = 0;

	LocationMap_t						LocationMap;
	std::vector<BlockStoreDiskLocation> Locations;
	BlockStore::ReclaimSnapshotState	BlockStoreState;
	{
		ZEN_TRACE_CPU("CasContainer::CollectGarbage::State");

		RwLock::SharedLockScope ___(m_LocationMapLock);
		Stopwatch				Timer;
		const auto				____ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
			 uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
			 ReadBlockTimeUs += ElapsedUs;
			 ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
		 });
		LocationMap					 = m_LocationMap;
		Locations					 = m_Locations;
		BlockStoreState				 = m_BlockStore.GetReclaimSnapshotState();
	}

	uint64_t TotalChunkCount = LocationMap.size();

	std::vector<IoHash> TotalChunkHashes;
	TotalChunkHashes.reserve(TotalChunkCount);
	for (const auto& Entry : LocationMap)
	{
		TotalChunkHashes.push_back(Entry.first);
	}

	std::vector<BlockStoreLocation> ChunkLocations;
	BlockStore::ChunkIndexArray		KeepChunkIndexes;
	std::vector<IoHash>				ChunkIndexToChunkHash;
	ChunkLocations.reserve(TotalChunkCount);
	KeepChunkIndexes.reserve(TotalChunkCount);
	ChunkIndexToChunkHash.reserve(TotalChunkCount);

	{
		ZEN_TRACE_CPU("CasContainer::CollectGarbage::Filter");
		GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
			auto						  KeyIt		   = LocationMap.find(ChunkHash);
			const BlockStoreDiskLocation& DiskLocation = Locations[KeyIt->second];
			BlockStoreLocation			  Location	   = DiskLocation.Get(m_PayloadAlignment);
			size_t						  ChunkIndex   = ChunkLocations.size();

			ChunkLocations.push_back(Location);
			ChunkIndexToChunkHash.push_back(ChunkHash);
			if (Keep)
			{
				KeepChunkIndexes.push_back(ChunkIndex);
			}
		});
	}

	const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
	if (!PerformDelete)
	{
		m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
		return;
	}

	std::vector<IoHash> DeletedChunks;
	m_BlockStore.ReclaimSpace(
		BlockStoreState,
		ChunkLocations,
		KeepChunkIndexes,
		m_PayloadAlignment,
		false,
		[&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
			std::vector<CasDiskIndexEntry> LogEntries;
			LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
			{
				RwLock::ExclusiveLockScope __(m_LocationMapLock);
				Stopwatch				   Timer;
				const auto				   ____ = MakeGuard([&] {
					uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
					WriteBlockTimeUs += ElapsedUs;
					WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
				});
				for (const auto& Entry : MovedChunks)
				{
					size_t					  ChunkIndex	= Entry.first;
					const BlockStoreLocation& NewLocation	= Entry.second;
					const IoHash&			  ChunkHash		= ChunkIndexToChunkHash[ChunkIndex];
					size_t					  LocationIndex = m_LocationMap[ChunkHash];
					BlockStoreDiskLocation&	  Location		= m_Locations[LocationIndex];
					if (Locations[LocationMap[ChunkHash]] != Location)
					{
						// Entry has been updated while GC was running, ignore the move
						continue;
					}
					Location = {NewLocation, m_PayloadAlignment};
					LogEntries.push_back({.Key = ChunkHash, .Location = Location});
				}
				for (const size_t ChunkIndex : RemovedChunks)
				{
					const IoHash&				  ChunkHash		= ChunkIndexToChunkHash[ChunkIndex];
					size_t						  LocationIndex = m_LocationMap[ChunkHash];
					const BlockStoreDiskLocation& Location		= Locations[LocationIndex];
					if (Locations[LocationMap[ChunkHash]] != Location)
					{
						// Entry has been updated while GC was running, ignore the delete
						continue;
					}
					LogEntries.push_back({.Key = ChunkHash, .Location = Location, .Flags = CasDiskIndexEntry::kTombstone});
					m_LocationMap.erase(ChunkHash);
					DeletedChunks.push_back(ChunkHash);
				}
			}

			m_CasLog.Append(LogEntries);
			m_CasLog.Flush();
		},
		[&GcCtx]() { return GcCtx.ClaimGCReserve(); });

	if (!DeletedChunks.empty())
	{
		// Clean up m_Locations vectors
		RwLock::ExclusiveLockScope IndexLock(m_LocationMapLock);
		CompactIndex(IndexLock);
	}
	GcCtx.AddDeletedCids(DeletedChunks);
}

class CasContainerStoreCompactor : public GcStoreCompactor
{
public:
	CasContainerStoreCompactor(CasContainerStrategy& Owner) : m_CasContainerStrategy(Owner) {}

	virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override
	{
		ZEN_TRACE_CPU("CasContainer::CompactStore");

		Stopwatch  Timer;
		const auto _ = MakeGuard([&] {
			if (!Ctx.Settings.Verbose)
			{
				return;
			}
			ZEN_INFO("GCV2: compactcas [COMPACT] '{}': RemovedDisk: {} in {}",
					 m_CasContainerStrategy.m_RootDirectory.filename() / m_CasContainerStrategy.m_ContainerBaseName,
					 NiceBytes(Stats.RemovedDisk),
					 NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
		});

		if (Ctx.Settings.CollectSmallObjects)
		{
			BlockStore::BlockUsageMap BlockUsage;
			{
				RwLock::SharedLockScope __(m_CasContainerStrategy.m_LocationMapLock);
				if (Ctx.IsCancelledFlag.load())
				{
					return;
				}

				for (const auto& Entry : m_CasContainerStrategy.m_LocationMap)
				{
					size_t						  Index = Entry.second;
					const BlockStoreDiskLocation& Loc	= m_CasContainerStrategy.m_Locations[Index];

					uint32_t BlockIndex = Loc.GetBlockIndex();
					uint64_t ChunkSize	= RoundUp(Loc.GetSize(), m_CasContainerStrategy.m_PayloadAlignment);
					if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end())
					{
						It->second.EntryCount++;
						It->second.DiskUsage += ChunkSize;
					}
					else
					{
						BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1});
					}
				}
			}

			{
				BlockStoreCompactState BlockCompactState;
				std::vector<IoHash>	   BlockCompactStateKeys;

				BlockStore::BlockEntryCountMap BlocksToCompact =
					m_CasContainerStrategy.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent);
				BlockCompactState.IncludeBlocks(BlocksToCompact);

				if (BlocksToCompact.size() > 0)
				{
					{
						RwLock::SharedLockScope __(m_CasContainerStrategy.m_LocationMapLock);
						for (const auto& Entry : m_CasContainerStrategy.m_LocationMap)
						{
							size_t						  Index = Entry.second;
							const BlockStoreDiskLocation& Loc	= m_CasContainerStrategy.m_Locations[Index];

							if (!BlockCompactState.AddKeepLocation(Loc.Get(m_CasContainerStrategy.m_PayloadAlignment)))
							{
								continue;
							}
							BlockCompactStateKeys.push_back(Entry.first);
						}
					}

					if (Ctx.Settings.IsDeleteMode)
					{
						if (Ctx.Settings.Verbose)
						{
							ZEN_INFO("GCV2: compactcas [COMPACT] '{}': compacting {} blocks",
									 m_CasContainerStrategy.m_RootDirectory.filename() / m_CasContainerStrategy.m_ContainerBaseName,
									 BlocksToCompact.size());
						}

						m_CasContainerStrategy.m_BlockStore.CompactBlocks(
							BlockCompactState,
							m_CasContainerStrategy.m_PayloadAlignment,
							[&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) {
								std::vector<CasDiskIndexEntry> MovedEntries;
								RwLock::ExclusiveLockScope	   _(m_CasContainerStrategy.m_LocationMapLock);
								for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray)
								{
									size_t		  ChunkIndex = Moved.first;
									const IoHash& Key		 = BlockCompactStateKeys[ChunkIndex];

									if (auto It = m_CasContainerStrategy.m_LocationMap.find(Key);
										It != m_CasContainerStrategy.m_LocationMap.end())
									{
										BlockStoreDiskLocation&	  Location	  = m_CasContainerStrategy.m_Locations[It->second];
										const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex);
										if (Location.Get(m_CasContainerStrategy.m_PayloadAlignment) != OldLocation)
										{
											// Someone has moved our chunk so lets just skip the new location we were provided, it will be
											// GC:d at a later time
											continue;
										}
										const BlockStoreLocation& NewLocation = Moved.second;

										Location = BlockStoreDiskLocation(NewLocation, m_CasContainerStrategy.m_PayloadAlignment);
										MovedEntries.push_back(CasDiskIndexEntry{.Key = Key, .Location = Location});
									}
								}
								m_CasContainerStrategy.m_CasLog.Append(MovedEntries);
								Stats.RemovedDisk += FreedDiskSpace;
								if (Ctx.IsCancelledFlag.load())
								{
									return false;
								}
								return true;
							},
							ClaimDiskReserveCallback,
							fmt::format("GCV2: compactcas [COMPACT] '{}': ",
										m_CasContainerStrategy.m_RootDirectory.filename() / m_CasContainerStrategy.m_ContainerBaseName));
					}
					else
					{
						if (Ctx.Settings.Verbose)
						{
							ZEN_INFO("GCV2: compactcas [COMPACT] '{}': skipped compacting of {} eligible blocks",
									 m_CasContainerStrategy.m_RootDirectory.filename() / m_CasContainerStrategy.m_ContainerBaseName,
									 BlocksToCompact.size());
						}
					}
				}
			}
		}
	}

	virtual std::string GetGcName(GcCtx& Ctx) override { return m_CasContainerStrategy.GetGcName(Ctx); }

	CasContainerStrategy& m_CasContainerStrategy;
};

class CasContainerReferencePruner : public GcReferencePruner
{
public:
	CasContainerReferencePruner(CasContainerStrategy& Owner, std::vector<IoHash>&& Cids)
	: m_CasContainerStrategy(Owner)
	, m_Cids(std::move(Cids))
	{
	}

	virtual std::string GetGcName(GcCtx& Ctx) override { return m_CasContainerStrategy.GetGcName(Ctx); }

	virtual GcStoreCompactor* RemoveUnreferencedData(GcCtx&							Ctx,
													 GcStats&						Stats,
													 const GetUnusedReferencesFunc& GetUnusedReferences) override
	{
		ZEN_TRACE_CPU("CasContainer::RemoveUnreferencedData");

		Stopwatch  Timer;
		const auto _ = MakeGuard([&] {
			if (!Ctx.Settings.Verbose)
			{
				return;
			}
			ZEN_INFO("GCV2: compactcas [PRUNE] '{}': Checked: {}, Deleted: {}, FreedMemory: {} in {}",
					 m_CasContainerStrategy.m_RootDirectory.filename() / m_CasContainerStrategy.m_ContainerBaseName,
					 Stats.CheckedCount,
					 Stats.DeletedCount,
					 NiceBytes(Stats.FreedMemory),
					 NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
		});

		std::vector<IoHash> UnusedCids = GetUnusedReferences(m_Cids);
		Stats.CheckedCount			   = m_Cids.size();
		Stats.FoundCount			   = UnusedCids.size();

		if (UnusedCids.empty())
		{
			// Nothing to collect
			return nullptr;
		}

		if (!Ctx.Settings.CollectSmallObjects)
		{
			return nullptr;
		}

		if (Ctx.Settings.IsDeleteMode)
		{
			std::vector<CasDiskIndexEntry> ExpiredEntries;
			ExpiredEntries.reserve(UnusedCids.size());

			{
				RwLock::ExclusiveLockScope __(m_CasContainerStrategy.m_LocationMapLock);
				if (Ctx.IsCancelledFlag.load())
				{
					return nullptr;
				}

				for (const IoHash& Cid : UnusedCids)
				{
					if (auto It = m_CasContainerStrategy.m_LocationMap.find(Cid); It != m_CasContainerStrategy.m_LocationMap.end())
					{
						ExpiredEntries.push_back({.Key		= Cid,
												  .Location = m_CasContainerStrategy.m_Locations[It->second],
												  .Flags	= CasDiskIndexEntry::kTombstone});
					}
				}

				if (!ExpiredEntries.empty())
				{
					for (const CasDiskIndexEntry& Entry : ExpiredEntries)
					{
						m_CasContainerStrategy.m_LocationMap.erase(Entry.Key);
						Stats.DeletedCount++;
					}
					m_CasContainerStrategy.m_CasLog.Append(ExpiredEntries);
					m_CasContainerStrategy.m_CasLog.Flush();
				}
			}
		}

		return new CasContainerStoreCompactor(m_CasContainerStrategy);
	}

private:
	CasContainerStrategy& m_CasContainerStrategy;
	std::vector<IoHash>	  m_Cids;
};

std::string
CasContainerStrategy::GetGcName(GcCtx&)
{
	return fmt::format("compactcas: '{}'", (m_RootDirectory / m_ContainerBaseName).string());
}

GcReferencePruner*
CasContainerStrategy::CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats&)
{
	ZEN_TRACE_CPU("CasContainer::CreateReferencePruner");

	Stopwatch  Timer;
	const auto _ = MakeGuard([&] {
		if (!Ctx.Settings.Verbose)
		{
			return;
		}
		ZEN_INFO("GCV2: compactcas [CREATE PRUNER] '{}' in {}",
				 m_RootDirectory / m_ContainerBaseName,
				 NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
	});

	std::vector<IoHash> CidsToCheck;
	{
		RwLock::SharedLockScope __(m_LocationMapLock);
		if (m_LocationMap.empty())
		{
			return {};
		}
		if (Ctx.IsCancelledFlag.load())
		{
			return nullptr;
		}

		CidsToCheck.reserve(m_LocationMap.size());
		for (const auto& It : m_LocationMap)
		{
			CidsToCheck.push_back(It.first);
		}
	}
	return new CasContainerReferencePruner(*this, std::move(CidsToCheck));
}

void
CasContainerStrategy::CompactIndex(RwLock::ExclusiveLockScope&)
{
	ZEN_TRACE_CPU("CasContainer::CompactIndex");

	size_t								EntryCount = m_LocationMap.size();
	LocationMap_t						LocationMap;
	std::vector<BlockStoreDiskLocation> Locations;
	Locations.reserve(EntryCount);
	LocationMap.reserve(EntryCount);
	for (auto It : m_LocationMap)
	{
		size_t EntryIndex = Locations.size();
		Locations.push_back(m_Locations[It.second]);
		LocationMap.insert({It.first, EntryIndex});
	}
	m_LocationMap.swap(LocationMap);
	m_Locations.swap(Locations);
}

GcStorageSize
CasContainerStrategy::StorageSize() const
{
	return {.DiskSize = m_BlockStore.TotalSize()};
}

void
CasContainerStrategy::MakeIndexSnapshot()
{
	ZEN_TRACE_CPU("CasContainer::MakeIndexSnapshot");

	uint64_t LogCount = m_CasLog.GetLogCount();
	if (m_LogFlushPosition == LogCount)
	{
		return;
	}

	ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory / m_ContainerBaseName);
	uint64_t   EntryCount = 0;
	Stopwatch  Timer;
	const auto _ = MakeGuard([&] {
		ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
				 m_RootDirectory / m_ContainerBaseName,
				 EntryCount,
				 NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
	});

	namespace fs = std::filesystem;

	fs::path IndexPath	   = GetIndexPath(m_RootDirectory, m_ContainerBaseName);
	fs::path TempIndexPath = GetTempIndexPath(m_RootDirectory, m_ContainerBaseName);

	// Move index away, we keep it if something goes wrong
	if (fs::is_regular_file(TempIndexPath))
	{
		std::error_code Ec;
		if (!fs::remove(TempIndexPath, Ec) || Ec)
		{
			ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", TempIndexPath, Ec.message());
			return;
		}
	}

	try
	{
		if (fs::is_regular_file(IndexPath))
		{
			fs::rename(IndexPath, TempIndexPath);
		}

		// Write the current state of the location map to a new index state
		std::vector<CasDiskIndexEntry> Entries;
		uint64_t					   IndexLogPosition = 0;

		{
			RwLock::SharedLockScope ___(m_LocationMapLock);
			IndexLogPosition = m_CasLog.GetLogCount();
			Entries.resize(m_LocationMap.size());

			uint64_t EntryIndex = 0;
			for (auto& Entry : m_LocationMap)
			{
				CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++];
				IndexEntry.Key				  = Entry.first;
				IndexEntry.Location			  = m_Locations[Entry.second];
			}
		}

		TemporaryFile	ObjectIndexFile;
		std::error_code Ec;
		ObjectIndexFile.CreateTemporary(IndexPath.parent_path(), Ec);
		if (Ec)
		{
			throw std::system_error(Ec, fmt::format("Failed to create temp file for index snapshot at '{}'", IndexPath));
		}
		CasDiskIndexHeader Header = {.EntryCount	   = Entries.size(),
									 .LogPosition	   = IndexLogPosition,
									 .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};

		Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header);

		ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexHeader), 0);
		ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
		ObjectIndexFile.Flush();
		ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec);
		if (Ec)
		{
			throw std::system_error(Ec, fmt::format("Failed to move temp file '{}' to '{}'", ObjectIndexFile.GetPath(), IndexPath));
		}
		EntryCount		   = Entries.size();
		m_LogFlushPosition = IndexLogPosition;
	}
	catch (const std::exception& Err)
	{
		ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what());

		// Restore any previous snapshot

		if (fs::is_regular_file(TempIndexPath))
		{
			std::error_code Ec;
			fs::remove(IndexPath, Ec);	// We don't care if this fails, we try to move the old temp file regardless
			fs::rename(TempIndexPath, IndexPath, Ec);
			if (Ec)
			{
				ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", TempIndexPath, Ec.message());
			}
		}
	}
	if (fs::is_regular_file(TempIndexPath))
	{
		std::error_code Ec;
		if (!fs::remove(TempIndexPath, Ec) || Ec)
		{
			ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempIndexPath, Ec.message());
		}
	}
}

uint64_t
CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion)
{
	ZEN_TRACE_CPU("CasContainer::ReadIndexFile");

	uint64_t   EntryCount = 0;
	Stopwatch  Timer;
	const auto _ = MakeGuard([&] {
		ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
	});

	BasicFile ObjectIndexFile;
	ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
	uint64_t Size = ObjectIndexFile.FileSize();
	if (Size >= sizeof(CasDiskIndexHeader))
	{
		uint64_t		   ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry);
		CasDiskIndexHeader Header;
		ObjectIndexFile.Read(&Header, sizeof(Header), 0);
		if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) &&
			(Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
			(Header.EntryCount <= ExpectedEntryCount))
		{
			m_PayloadAlignment = Header.PayloadAlignment;

			m_Locations.reserve(ExpectedEntryCount);
			m_LocationMap.reserve(ExpectedEntryCount);

			std::vector<CasDiskIndexEntry> Entries;
			Entries.resize(128 * 1024 / sizeof(CasDiskIndexEntry));

			uint64_t RemainingEntries = Header.EntryCount;
			uint64_t ReadOffset		  = sizeof(CasDiskIndexHeader);

			do
			{
				const uint64_t NumToRead = Min(RemainingEntries, Entries.size());
				Entries.resize(NumToRead);

				ObjectIndexFile.Read(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), ReadOffset);

				std::string InvalidEntryReason;
				for (const CasDiskIndexEntry& Entry : Entries)
				{
					if (!ValidateEntry(Entry, InvalidEntryReason))
					{
						ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
						continue;
					}
					m_LocationMap[Entry.Key] = m_Locations.size();
					m_Locations.push_back(Entry.Location);
					++EntryCount;
				}

				RemainingEntries -= NumToRead;
				ReadOffset += NumToRead * sizeof(CasDiskIndexEntry);
			} while (RemainingEntries);

			OutVersion = CasDiskIndexHeader::CurrentVersion;
			return Header.LogPosition;
		}
		else
		{
			ZEN_WARN("skipping invalid index file '{}'", IndexPath);
		}
	}
	return 0;
}

uint64_t
CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
{
	ZEN_TRACE_CPU("CasContainer::ReadLog");

	if (!TCasLogFile<CasDiskIndexEntry>::IsValid(LogPath))
	{
		ZEN_WARN("removing invalid cas log at '{}'", LogPath);
		std::filesystem::remove(LogPath);
		return 0;
	}

	size_t	   LogEntryCount = 0;
	Stopwatch  Timer;
	const auto _ = MakeGuard([&] {
		ZEN_INFO("read store '{}' log containing {} entries in {}",
				 m_RootDirectory / m_ContainerBaseName,
				 LogEntryCount,
				 NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
	});

	TCasLogFile<CasDiskIndexEntry> CasLog;
	CasLog.Open(LogPath, CasLogFile::Mode::kRead);
	if (CasLog.Initialize())
	{
		uint64_t EntryCount = CasLog.GetLogCount();
		if (EntryCount < SkipEntryCount)
		{
			ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
			SkipEntryCount = 0;
		}
		LogEntryCount = EntryCount - SkipEntryCount;
		CasLog.Replay(
			[&](const CasDiskIndexEntry& Record) {
				LogEntryCount++;
				std::string InvalidEntryReason;
				if (Record.Flags & CasDiskIndexEntry::kTombstone)
				{
					m_LocationMap.erase(Record.Key);
					return;
				}
				if (!ValidateEntry(Record, InvalidEntryReason))
				{
					ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
					return;
				}
				m_LocationMap[Record.Key] = m_Locations.size();
				m_Locations.push_back(Record.Location);
			},
			SkipEntryCount);

		return LogEntryCount;
	}
	return 0;
}

void
CasContainerStrategy::OpenContainer(bool IsNewStore)
{
	ZEN_TRACE_CPU("CasContainer::OpenContainer");

	// Add .running file and delete on clean on close to detect bad termination

	m_LocationMap.clear();
	m_Locations.clear();

	std::filesystem::path BasePath = GetBasePath(m_RootDirectory, m_ContainerBaseName);

	if (IsNewStore)
	{
		std::filesystem::remove_all(BasePath);
	}

	CreateDirectories(BasePath);

	m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1);

	std::filesystem::path LogPath	= GetLogPath(m_RootDirectory, m_ContainerBaseName);
	std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName);

	if (std::filesystem::is_regular_file(IndexPath))
	{
		uint32_t IndexVersion = 0;
		m_LogFlushPosition	  = ReadIndexFile(IndexPath, IndexVersion);
		if (IndexVersion == 0)
		{
			ZEN_WARN("removing invalid index file at '{}'", IndexPath);
			std::filesystem::remove(IndexPath);
		}
	}

	uint64_t LogEntryCount = 0;
	if (std::filesystem::is_regular_file(LogPath))
	{
		if (TCasLogFile<CasDiskIndexEntry>::IsValid(LogPath))
		{
			LogEntryCount = ReadLog(LogPath, m_LogFlushPosition);
		}
		else
		{
			ZEN_WARN("removing invalid cas log at '{}'", LogPath);
			std::filesystem::remove(LogPath);
		}
	}

	m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);

	BlockStore::BlockIndexSet KnownBlocks;

	for (const auto& Entry : m_LocationMap)
	{
		const BlockStoreDiskLocation& DiskLocation	= m_Locations[Entry.second];
		BlockStoreLocation			  BlockLocation = DiskLocation.Get(m_PayloadAlignment);
		KnownBlocks.Add(BlockLocation.BlockIndex);
	}

	m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);

	if (IsNewStore || (LogEntryCount > 0))
	{
		MakeIndexSnapshot();
	}

	// TODO: should validate integrity of container files here
}

//////////////////////////////////////////////////////////////////////////

#if ZEN_WITH_TESTS

TEST_CASE("compactcas.hex")
{
	uint32_t	Value;
	std::string HexString;
	CHECK(!ParseHexNumber("", Value));
	char Hex[9];

	ToHexNumber(0u, Hex);
	HexString = std::string(Hex);
	CHECK(ParseHexNumber(HexString, Value));
	CHECK(Value == 0u);

	ToHexNumber(std::numeric_limits<std::uint32_t>::max(), Hex);
	HexString = std::string(Hex);
	CHECK(HexString == "ffffffff");
	CHECK(ParseHexNumber(HexString, Value));
	CHECK(Value == std::numeric_limits<std::uint32_t>::max());

	ToHexNumber(0xadf14711u, Hex);
	HexString = std::string(Hex);
	CHECK(HexString == "adf14711");
	CHECK(ParseHexNumber(HexString, Value));
	CHECK(Value == 0xadf14711u);

	ToHexNumber(0x80000000u, Hex);
	HexString = std::string(Hex);
	CHECK(HexString == "80000000");
	CHECK(ParseHexNumber(HexString, Value));
	CHECK(Value == 0x80000000u);

	ToHexNumber(0x718293a4u, Hex);
	HexString = std::string(Hex);
	CHECK(HexString == "718293a4");
	CHECK(ParseHexNumber(HexString, Value));
	CHECK(Value == 0x718293a4u);
}

TEST_CASE("compactcas.compact.gc")
{
	ScopedTemporaryDirectory TempDir;

	const int kIterationCount = 1000;

	std::vector<IoHash> Keys(kIterationCount);

	{
		GcManager			 Gc;
		CasContainerStrategy Cas(Gc);
		Cas.Initialize(TempDir.Path(), "test", 65536, 16, true);

		for (int i = 0; i < kIterationCount; ++i)
		{
			CbObjectWriter Cbo;
			Cbo << "id" << i;
			CbObject Obj = Cbo.Save();

			IoBuffer	 ObjBuffer = Obj.GetBuffer().AsIoBuffer();
			const IoHash Hash	   = IoHash::HashBuffer(ObjBuffer);

			Cas.InsertChunk(ObjBuffer, Hash);

			Keys[i] = Hash;
		}

		for (int i = 0; i < kIterationCount; ++i)
		{
			IoBuffer Chunk = Cas.FindChunk(Keys[i]);

			CHECK(!!Chunk);

			CbObject Value = LoadCompactBinaryObject(Chunk);

			CHECK_EQ(Value["id"].AsInt32(), i);
		}
	}

	// Validate that we can still read the inserted data after closing
	// the original cas store

	{
		GcManager			 Gc;
		CasContainerStrategy Cas(Gc);
		Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);

		for (int i = 0; i < kIterationCount; ++i)
		{
			IoBuffer Chunk = Cas.FindChunk(Keys[i]);

			CHECK(!!Chunk);

			CbObject Value = LoadCompactBinaryObject(Chunk);

			CHECK_EQ(Value["id"].AsInt32(), i);
		}
	}
}

TEST_CASE("compactcas.compact.totalsize")
{
	std::random_device rd;
	std::mt19937	   g(rd());

	//	for (uint32_t i = 0; i < 100; ++i)
	{
		ScopedTemporaryDirectory TempDir;

		const uint64_t kChunkSize  = 1024;
		const int32_t  kChunkCount = 16;

		{
			GcManager			 Gc;
			CasContainerStrategy Cas(Gc);
			Cas.Initialize(TempDir.Path(), "test", 65536, 16, true);

			for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
			{
				IoBuffer			   Chunk		= CreateRandomBlob(kChunkSize);
				const IoHash		   Hash			= IoHash::HashBuffer(Chunk);
				CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
				ZEN_ASSERT(InsertResult.New);
			}

			const uint64_t TotalSize = Cas.StorageSize().DiskSize;
			CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
		}

		{
			GcManager			 Gc;
			CasContainerStrategy Cas(Gc);
			Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);

			const uint64_t TotalSize = Cas.StorageSize().DiskSize;
			CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
		}

		// Re-open again, this time we should have a snapshot
		{
			GcManager			 Gc;
			CasContainerStrategy Cas(Gc);
			Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);

			const uint64_t TotalSize = Cas.StorageSize().DiskSize;
			CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
		}
	}
}

TEST_CASE("compactcas.gc.basic")
{
	ScopedTemporaryDirectory TempDir;

	GcManager			 Gc;
	CasContainerStrategy Cas(Gc);
	Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true);

	IoBuffer Chunk	   = CreateRandomBlob(128);
	IoHash	 ChunkHash = IoHash::HashBuffer(Chunk);

	const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
	CHECK(InsertResult.New);
	Cas.Flush();

	GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
	GcCtx.CollectSmallObjects(true);

	Cas.CollectGarbage(GcCtx);

	CHECK(!Cas.HaveChunk(ChunkHash));
}

TEST_CASE("compactcas.gc.removefile")
{
	ScopedTemporaryDirectory TempDir;

	IoBuffer Chunk	   = CreateRandomBlob(128);
	IoHash	 ChunkHash = IoHash::HashBuffer(Chunk);
	{
		GcManager			 Gc;
		CasContainerStrategy Cas(Gc);
		Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true);

		const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
		CHECK(InsertResult.New);
		const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash);
		CHECK(!InsertResultDup.New);
		Cas.Flush();
	}

	GcManager			 Gc;
	CasContainerStrategy Cas(Gc);
	Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, false);

	GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
	GcCtx.CollectSmallObjects(true);

	Cas.CollectGarbage(GcCtx);

	CHECK(!Cas.HaveChunk(ChunkHash));
}

TEST_CASE("compactcas.gc.compact")
{
	{
		ScopedTemporaryDirectory TempDir;

		GcManager			 Gc;
		CasContainerStrategy Cas(Gc);
		Cas.Initialize(TempDir.Path(), "cb", 2048, 1 << 4, true);

		uint64_t			  ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5};
		std::vector<IoBuffer> Chunks;
		Chunks.reserve(9);
		for (uint64_t Size : ChunkSizes)
		{
			Chunks.push_back(CreateRandomBlob(Size));
		}

		std::vector<IoHash> ChunkHashes;
		ChunkHashes.reserve(9);
		for (const IoBuffer& Chunk : Chunks)
		{
			ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
		}

		CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New);
		CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New);
		CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New);
		CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New);
		CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New);
		CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New);
		CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New);
		CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New);
		CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New);

		CHECK(Cas.HaveChunk(ChunkHashes[0]));
		CHECK(Cas.HaveChunk(ChunkHashes[1]));
		CHECK(Cas.HaveChunk(ChunkHashes[2]));
		CHECK(Cas.HaveChunk(ChunkHashes[3]));
		CHECK(Cas.HaveChunk(ChunkHashes[4]));
		CHECK(Cas.HaveChunk(ChunkHashes[5]));
		CHECK(Cas.HaveChunk(ChunkHashes[6]));
		CHECK(Cas.HaveChunk(ChunkHashes[7]));
		CHECK(Cas.HaveChunk(ChunkHashes[8]));

		auto ValidateChunkExists = [&](size_t Index) {
			IoBuffer Chunk	= Cas.FindChunk(ChunkHashes[Index]);
			bool	 Exists = !!Chunk;
			CHECK(Exists);
			IoHash Hash = IoHash::HashBuffer(Chunk);
			if (ChunkHashes[Index] != Hash)
			{
				CHECK(fmt::format("{}", ChunkHashes[Index]) == fmt::format("{}", Hash));
			}
		};

		// Keep first and last
		{
			GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
			GcCtx.CollectSmallObjects(true);

			std::vector<IoHash> KeepChunks;
			KeepChunks.push_back(ChunkHashes[0]);
			KeepChunks.push_back(ChunkHashes[8]);
			GcCtx.AddRetainedCids(KeepChunks);

			Cas.Flush();
			Cas.CollectGarbage(GcCtx);

			CHECK(Cas.HaveChunk(ChunkHashes[0]));
			CHECK(!Cas.HaveChunk(ChunkHashes[1]));
			CHECK(!Cas.HaveChunk(ChunkHashes[2]));
			CHECK(!Cas.HaveChunk(ChunkHashes[3]));
			CHECK(!Cas.HaveChunk(ChunkHashes[4]));
			CHECK(!Cas.HaveChunk(ChunkHashes[5]));
			CHECK(!Cas.HaveChunk(ChunkHashes[6]));
			CHECK(!Cas.HaveChunk(ChunkHashes[7]));
			CHECK(Cas.HaveChunk(ChunkHashes[8]));

			ValidateChunkExists(0);
			ValidateChunkExists(8);

			Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
			Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
			Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
			Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
			Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
			Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
			Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
		}

		// Keep last
		{
			GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
			GcCtx.CollectSmallObjects(true);
			std::vector<IoHash> KeepChunks;
			KeepChunks.push_back(ChunkHashes[8]);
			GcCtx.AddRetainedCids(KeepChunks);

			Cas.Flush();
			Cas.CollectGarbage(GcCtx);

			CHECK(!Cas.HaveChunk(ChunkHashes[0]));
			CHECK(!Cas.HaveChunk(ChunkHashes[1]));
			CHECK(!Cas.HaveChunk(ChunkHashes[2]));
			CHECK(!Cas.HaveChunk(ChunkHashes[3]));
			CHECK(!Cas.HaveChunk(ChunkHashes[4]));
			CHECK(!Cas.HaveChunk(ChunkHashes[5]));
			CHECK(!Cas.HaveChunk(ChunkHashes[6]));
			CHECK(!Cas.HaveChunk(ChunkHashes[7]));
			CHECK(Cas.HaveChunk(ChunkHashes[8]));

			ValidateChunkExists(8);

			Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
			Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
			Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
			Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
			Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
			Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
			Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
		}

		// Keep mixed
		{
			GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
			GcCtx.CollectSmallObjects(true);
			std::vector<IoHash> KeepChunks;
			KeepChunks.push_back(ChunkHashes[1]);
			KeepChunks.push_back(ChunkHashes[4]);
			KeepChunks.push_back(ChunkHashes[7]);
			GcCtx.AddRetainedCids(KeepChunks);

			Cas.Flush();
			Cas.CollectGarbage(GcCtx);

			CHECK(!Cas.HaveChunk(ChunkHashes[0]));
			CHECK(Cas.HaveChunk(ChunkHashes[1]));
			CHECK(!Cas.HaveChunk(ChunkHashes[2]));
			CHECK(!Cas.HaveChunk(ChunkHashes[3]));
			CHECK(Cas.HaveChunk(ChunkHashes[4]));
			CHECK(!Cas.HaveChunk(ChunkHashes[5]));
			CHECK(!Cas.HaveChunk(ChunkHashes[6]));
			CHECK(Cas.HaveChunk(ChunkHashes[7]));
			CHECK(!Cas.HaveChunk(ChunkHashes[8]));

			ValidateChunkExists(1);
			ValidateChunkExists(4);
			ValidateChunkExists(7);

			Cas.InsertChunk(Chunks[0], ChunkHashes[0]);
			Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
			Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
			Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
			Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
			Cas.InsertChunk(Chunks[8], ChunkHashes[8]);
		}

		// Keep multiple at end
		{
			GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
			GcCtx.CollectSmallObjects(true);
			std::vector<IoHash> KeepChunks;
			KeepChunks.push_back(ChunkHashes[6]);
			KeepChunks.push_back(ChunkHashes[7]);
			KeepChunks.push_back(ChunkHashes[8]);
			GcCtx.AddRetainedCids(KeepChunks);

			Cas.Flush();
			Cas.CollectGarbage(GcCtx);

			CHECK(!Cas.HaveChunk(ChunkHashes[0]));
			CHECK(!Cas.HaveChunk(ChunkHashes[1]));
			CHECK(!Cas.HaveChunk(ChunkHashes[2]));
			CHECK(!Cas.HaveChunk(ChunkHashes[3]));
			CHECK(!Cas.HaveChunk(ChunkHashes[4]));
			CHECK(!Cas.HaveChunk(ChunkHashes[5]));
			CHECK(Cas.HaveChunk(ChunkHashes[6]));
			CHECK(Cas.HaveChunk(ChunkHashes[7]));
			CHECK(Cas.HaveChunk(ChunkHashes[8]));

			ValidateChunkExists(6);
			ValidateChunkExists(7);
			ValidateChunkExists(8);

			Cas.InsertChunk(Chunks[0], ChunkHashes[0]);
			Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
			Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
			Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
			Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
			Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
		}

		// Keep every other
		{
			GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
			GcCtx.CollectSmallObjects(true);
			std::vector<IoHash> KeepChunks;
			KeepChunks.push_back(ChunkHashes[0]);
			KeepChunks.push_back(ChunkHashes[2]);
			KeepChunks.push_back(ChunkHashes[4]);
			KeepChunks.push_back(ChunkHashes[6]);
			KeepChunks.push_back(ChunkHashes[8]);
			GcCtx.AddRetainedCids(KeepChunks);

			Cas.Flush();
			Cas.CollectGarbage(GcCtx);

			CHECK(Cas.HaveChunk(ChunkHashes[0]));
			CHECK(!Cas.HaveChunk(ChunkHashes[1]));
			CHECK(Cas.HaveChunk(ChunkHashes[2]));
			CHECK(!Cas.HaveChunk(ChunkHashes[3]));
			CHECK(Cas.HaveChunk(ChunkHashes[4]));
			CHECK(!Cas.HaveChunk(ChunkHashes[5]));
			CHECK(Cas.HaveChunk(ChunkHashes[6]));
			CHECK(!Cas.HaveChunk(ChunkHashes[7]));
			CHECK(Cas.HaveChunk(ChunkHashes[8]));

			ValidateChunkExists(0);
			ValidateChunkExists(2);
			ValidateChunkExists(4);
			ValidateChunkExists(6);
			ValidateChunkExists(8);

			Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
			Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
			Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
			Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
		}

		// Verify that we nicely appended blocks even after all GC operations
		ValidateChunkExists(0);
		ValidateChunkExists(1);
		ValidateChunkExists(2);
		ValidateChunkExists(3);
		ValidateChunkExists(4);
		ValidateChunkExists(5);
		ValidateChunkExists(6);
		ValidateChunkExists(7);
		ValidateChunkExists(8);
	}
}

TEST_CASE("compactcas.gc.deleteblockonopen")
{
	ScopedTemporaryDirectory TempDir;

	uint64_t			  ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
	std::vector<IoBuffer> Chunks;
	Chunks.reserve(20);
	for (uint64_t Size : ChunkSizes)
	{
		Chunks.push_back(CreateRandomBlob(Size));
	}

	std::vector<IoHash> ChunkHashes;
	ChunkHashes.reserve(20);
	for (const IoBuffer& Chunk : Chunks)
	{
		ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
	}

	{
		GcManager			 Gc;
		CasContainerStrategy Cas(Gc);
		Cas.Initialize(TempDir.Path(), "test", 1024, 16, true);

		for (size_t i = 0; i < 20; i++)
		{
			CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
		}

		// GC every other block
		{
			GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
			GcCtx.CollectSmallObjects(true);
			std::vector<IoHash> KeepChunks;
			for (size_t i = 0; i < 20; i += 2)
			{
				KeepChunks.push_back(ChunkHashes[i]);
			}
			GcCtx.AddRetainedCids(KeepChunks);

			Cas.Flush();
			Cas.CollectGarbage(GcCtx);

			for (size_t i = 0; i < 20; i += 2)
			{
				CHECK(Cas.HaveChunk(ChunkHashes[i]));
				CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
				CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
			}
		}
	}
	{
		// Re-open
		GcManager			 Gc;
		CasContainerStrategy Cas(Gc);
		Cas.Initialize(TempDir.Path(), "test", 1024, 16, false);

		for (size_t i = 0; i < 20; i += 2)
		{
			CHECK(Cas.HaveChunk(ChunkHashes[i]));
			CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
			CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
		}
	}
}

TEST_CASE("compactcas.gc.handleopeniobuffer")
{
	ScopedTemporaryDirectory TempDir;

	uint64_t			  ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
	std::vector<IoBuffer> Chunks;
	Chunks.reserve(20);
	for (const uint64_t& Size : ChunkSizes)
	{
		Chunks.push_back(CreateRandomBlob(Size));
	}

	std::vector<IoHash> ChunkHashes;
	ChunkHashes.reserve(20);
	for (const IoBuffer& Chunk : Chunks)
	{
		ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
	}

	GcManager			 Gc;
	CasContainerStrategy Cas(Gc);
	Cas.Initialize(TempDir.Path(), "test", 1024, 16, true);

	for (size_t i = 0; i < 20; i++)
	{
		CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
	}

	IoBuffer RetainChunk = Cas.FindChunk(ChunkHashes[5]);
	Cas.Flush();

	// GC everything
	GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
	GcCtx.CollectSmallObjects(true);
	Cas.CollectGarbage(GcCtx);

	for (size_t i = 0; i < 20; i++)
	{
		CHECK(!Cas.HaveChunk(ChunkHashes[i]));
	}

	CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk));
}

TEST_CASE_TEMPLATE("compactcas.threadedinsert", GCV2, FalseType, TrueType)
{
	// for (uint32_t i = 0; i < 100; ++i)
	{
		ScopedTemporaryDirectory TempDir;

		const uint64_t kChunkSize	= 1048;
		const int32_t  kChunkCount	= 4096;
		uint64_t	   ExpectedSize = 0;

		std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Chunks;
		Chunks.reserve(kChunkCount);

		for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
		{
			while (true)
			{
				IoBuffer Chunk = CreateRandomBlob(kChunkSize);
				IoHash	 Hash  = IoHash::HashBuffer(Chunk);
				if (Chunks.contains(Hash))
				{
					continue;
				}
				Chunks[Hash] = Chunk;
				ExpectedSize += Chunk.Size();
				break;
			}
		}

		std::atomic<size_t>	 WorkCompleted = 0;
		WorkerThreadPool	 ThreadPool(4);
		GcManager			 Gc;
		CasContainerStrategy Cas(Gc);
		Cas.Initialize(TempDir.Path(), "test", 32768, 16, true);
		{
			for (const auto& Chunk : Chunks)
			{
				const IoHash&	Hash   = Chunk.first;
				const IoBuffer& Buffer = Chunk.second;
				ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() {
					CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
					ZEN_ASSERT(InsertResult.New);
					WorkCompleted.fetch_add(1);
				});
			}
			while (WorkCompleted < Chunks.size())
			{
				Sleep(1);
			}
		}

		WorkCompleted			 = 0;
		const uint64_t TotalSize = Cas.StorageSize().DiskSize;
		CHECK_LE(ExpectedSize, TotalSize);
		CHECK_GE(ExpectedSize + 32768, TotalSize);

		{
			for (const auto& Chunk : Chunks)
			{
				ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() {
					IoHash	 ChunkHash = Chunk.first;
					IoBuffer Buffer	   = Cas.FindChunk(ChunkHash);
					IoHash	 Hash	   = IoHash::HashBuffer(Buffer);
					CHECK(ChunkHash == Hash);
					WorkCompleted.fetch_add(1);
				});
			}
			while (WorkCompleted < Chunks.size())
			{
				Sleep(1);
			}
		}

		std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes;
		GcChunkHashes.reserve(Chunks.size());
		for (const auto& Chunk : Chunks)
		{
			GcChunkHashes.insert(Chunk.first);
		}
		{
			WorkCompleted = 0;
			std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks;
			NewChunks.reserve(kChunkCount);

			for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
			{
				IoBuffer Chunk	= CreateRandomBlob(kChunkSize);
				IoHash	 Hash	= IoHash::HashBuffer(Chunk);
				NewChunks[Hash] = Chunk;
				GcChunkHashes.insert(Hash);
			}

			std::atomic_uint32_t AddedChunkCount;

			for (const auto& Chunk : NewChunks)
			{
				ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() {
					Cas.InsertChunk(Chunk.second, Chunk.first);
					AddedChunkCount.fetch_add(1);
					WorkCompleted.fetch_add(1);
				});
			}
			for (const auto& Chunk : Chunks)
			{
				ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() {
					IoHash	 ChunkHash = Chunk.first;
					IoBuffer Buffer	   = Cas.FindChunk(ChunkHash);
					if (Buffer)
					{
						CHECK(ChunkHash == IoHash::HashBuffer(Buffer));
					}
					WorkCompleted.fetch_add(1);
				});
			}

			std::unordered_set<IoHash, IoHash::Hasher> ChunksToDelete;
			std::vector<IoHash>						   KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());

			size_t C = 0;
			while (C < KeepHashes.size())
			{
				if (C % 155 == 0)
				{
					if (C < KeepHashes.size() - 1)
					{
						ChunksToDelete.insert(KeepHashes[C]);
						KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
						KeepHashes.pop_back();
					}
					if (C + 3 < KeepHashes.size() - 1)
					{
						ChunksToDelete.insert(KeepHashes[C + 3]);
						KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
						KeepHashes.pop_back();
					}
				}
				C++;
			}

			auto DoGC = [](CasContainerStrategy&							 Cas,
						   const std::unordered_set<IoHash, IoHash::Hasher>& ChunksToDelete,
						   const std::vector<IoHash>&						 KeepHashes,
						   std::unordered_set<IoHash, IoHash::Hasher>&		 GcChunkHashes) {
				if (GCV2::Enabled)
				{
					std::atomic_bool	  IsCancelledFlag = false;
					GcCtx				  Ctx			  = {.Settings		  = {.CacheExpireTime					= GcClock::Now() - std::chrono::hours(24),
																				 .ProjectStoreExpireTime			= GcClock::Now() - std::chrono::hours(24),
																				 .CollectSmallObjects				= true,
																				 .IsDeleteMode						= true,
																				 .CompactBlockUsageThresholdPercent = 100},
															 .IsCancelledFlag = IsCancelledFlag};
					GcReferenceStoreStats PrunerStats;
					GcReferencePruner*	  Pruner = Cas.CreateReferencePruner(Ctx, PrunerStats);
					CHECK(Pruner);
					HashKeySet		  Deleted;
					GcStats			  Stats;
					GcStoreCompactor* Compactor =
						Pruner->RemoveUnreferencedData(Ctx, Stats, [&](std::span<IoHash> References) -> std::vector<IoHash> {
							std::vector<IoHash> Unreferenced;
							HashKeySet			Retain;
							Retain.AddHashesToSet(KeepHashes);
							for (const IoHash& ChunkHash : References)
							{
								if (!Retain.ContainsHash(ChunkHash))
								{
									Unreferenced.push_back(ChunkHash);
								}
							}
							Deleted.AddHashesToSet(Unreferenced);
							return Unreferenced;
						});
					if (Compactor)
					{
						Deleted.IterateHashes([&GcChunkHashes, &ChunksToDelete](const IoHash& ChunkHash) {
							CHECK(ChunksToDelete.contains(ChunkHash));
							GcChunkHashes.erase(ChunkHash);
						});
						GcCompactStoreStats CompactStats;
						Compactor->CompactStore(Ctx, CompactStats, []() { return 0; });
					}
				}
				else
				{
					GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
					GcCtx.CollectSmallObjects(true);
					GcCtx.AddRetainedCids(KeepHashes);
					Cas.CollectGarbage(GcCtx);
					const HashKeySet& Deleted = GcCtx.DeletedCids();
					Deleted.IterateHashes([&GcChunkHashes, &ChunksToDelete](const IoHash& ChunkHash) {
						CHECK(ChunksToDelete.contains(ChunkHash));
						GcChunkHashes.erase(ChunkHash);
					});
				}
			};

			while (AddedChunkCount.load() < NewChunks.size())
			{
				DoGC(Cas, ChunksToDelete, KeepHashes, GcChunkHashes);
			}

			while (WorkCompleted < NewChunks.size() + Chunks.size())
			{
				Sleep(1);
			}

			DoGC(Cas, ChunksToDelete, KeepHashes, GcChunkHashes);
		}
		{
			WorkCompleted = 0;
			for (const IoHash& ChunkHash : GcChunkHashes)
			{
				ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() {
					CHECK(Cas.HaveChunk(ChunkHash));
					CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
					WorkCompleted.fetch_add(1);
				});
			}
			while (WorkCompleted < GcChunkHashes.size())
			{
				Sleep(1);
			}
		}
	}
}

#endif

void
compactcas_forcelink()
{
}

}  // namespace zen
